package com.ruyuan.little.project.rocketmq.api.hotel.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.dto.CommonResponse;
import com.ruyuan.little.project.common.enums.ErrorCodeEnum;
import com.ruyuan.little.project.common.enums.LittleProjectTypeEnum;
import com.ruyuan.little.project.redis.api.RedisApi;
import com.ruyuan.little.project.rocketmq.admin.dto.AdminHotelRoom;
import com.ruyuan.little.project.rocketmq.api.hotel.dto.HotelRoom;
import com.ruyuan.little.project.rocketmq.api.hotel.dto.HotelRoomMessage;
import com.ruyuan.little.project.rocketmq.api.hotel.service.impl.HotelRoomCacheManager;
import com.ruyuan.little.project.rocketmq.common.constants.RedisKeyConstant;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;

@Component
public class HotelRoomUpdateMessageListener implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(HotelRoomUpdateMessageListener.class);

    @Reference(version = "1.0.0",
            interfaceClass = RedisApi.class,
            cluster = "failfast",check = false)
    private RedisApi redisApi;

    @Resource
    private HotelRoomCacheManager hotelRoomCacheManager;

    /**
     * 房间更新消息处理
     * 1. 主要就是更新JVM 房间数据的缓存
     * @param msgs
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        for (MessageExt msg : msgs) {

            String body = new String(msg.getBody(), StandardCharsets.UTF_8);
            try {
                HotelRoomMessage hotelRoomMessage = JSON.parseObject(body, HotelRoomMessage.class);
                Long roomId = hotelRoomMessage.getRoomId();

                LOGGER.info("receiver room update message roomId:{}", roomId);
                LOGGER.info("start query hotel room from redis cache param:{}", roomId);

                CommonResponse<String> commonResponse = redisApi.get(RedisKeyConstant.HOTEL_ROOM_KEY_PREFIX + roomId, hotelRoomMessage.getPhoneNumber(), LittleProjectTypeEnum.ROCKETMQ);

                if (Objects.equals(commonResponse.getCode(), ErrorCodeEnum.SUCCESS.getCode())) {
                    LOGGER.info("update hotel room local cache data:{}", commonResponse.getData());
                    hotelRoomCacheManager.updateLocalCache(roomId, JSON.parseObject(commonResponse.getData(), HotelRoom.class));
                }
            } catch (Exception e) {
                LOGGER.info("received hotel room update message:{}, consumer fail", body);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
